package com.katesoft.scale4j.rttp.jobs;

import static com.katesoft.scale4j.common.utils.AssertUtility.assertTrue;
import static java.lang.String.format;

import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

import javax.sql.DataSource;

import org.perf4j.StopWatch;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;

import com.katesoft.scale4j.common.concurrent.MultiTaskExecutionWaiter;
import com.katesoft.scale4j.common.services.IBeanNameReferences;
import com.katesoft.scale4j.log.LogFactory;
import com.katesoft.scale4j.log.Logger;
import com.katesoft.scale4j.rttp.jobs.jdbc.init.RttpBatchSchemaCreator;
import com.katesoft.scale4j.rttp.jobs.jdbc.init.RttpQuartzSchemaCreator;
import com.katesoft.scale4j.rttp.messaging.jdbc.init.RttpMessageStoreSchemaCreator;

/**
 * Internal class.
 * 
 * @author kate2007
 */
public class RttpDatabaseInitializer implements InitializingBean, ApplicationContextAware {
   private final Logger logger = LogFactory.getLogger(getClass());
   private DataSource dataSource;
   private boolean quartzTablesCreationEnabled = true;
   private boolean batchTablesCreationEnabled = true;
   private boolean messageStoreCreationEnabled = true;
   private boolean parallelTablesCreation = false;
   private ApplicationContext applicationContext;

   @Override
   public void afterPropertiesSet() throws IOException, ExecutionException {
      Map<String, RttpBatchSchemaCreator> rttpBatchSchemaCreators = applicationContext
               .getBeansOfType(RttpBatchSchemaCreator.class);
      Map<String, RttpQuartzSchemaCreator> rttpQuartzSchemaCreators = applicationContext
               .getBeansOfType(RttpQuartzSchemaCreator.class);
      Map<String, RttpMessageStoreSchemaCreator> rttpMessageStoreSchemaCreators = applicationContext
               .getBeansOfType(RttpMessageStoreSchemaCreator.class);
      //
      assertTrue(
               rttpBatchSchemaCreators.size() <= 1,
               format("there are > 1 RttpBatchSchemaCreators %s which is not allowed",
                        rttpBatchSchemaCreators.keySet()));
      assertTrue(
               rttpBatchSchemaCreators.size() <= 1,
               format("there are > 1 RttpQuartzSchemaCreators %s which is not allowed",
                        rttpQuartzSchemaCreators.keySet()));
      assertTrue(
               rttpBatchSchemaCreators.size() <= 1,
               format("there are > 1 RttpMessageStoreSchemaCreator %s which is not allowed",
                        rttpMessageStoreSchemaCreators.keySet()));
      //
      RttpBatchSchemaCreator rttpBatchSchemaCreator = rttpBatchSchemaCreators.isEmpty() ? null
               : rttpBatchSchemaCreators.values().iterator().next();
      RttpQuartzSchemaCreator rttpQuartzSchemaCreator = rttpQuartzSchemaCreators.isEmpty() ? null
               : rttpQuartzSchemaCreators.values().iterator().next();
      RttpMessageStoreSchemaCreator rttpMessageStoreSchemaCreator = rttpMessageStoreSchemaCreators
               .isEmpty() ? null : rttpMessageStoreSchemaCreators.values().iterator().next();
      DataSourceInitializer dataSourceInitializer1 = null;
      DataSourceInitializer dataSourceInitializer2 = null;
      DataSourceInitializer dataSourceInitializer3 = null;
      if (rttpBatchSchemaCreator != null) {
         dataSourceInitializer1 = new DataSourceInitializer();
         dataSourceInitializer1.setDatabasePopulator(rttpBatchSchemaCreator);
      }
      if (rttpQuartzSchemaCreator != null) {
         dataSourceInitializer2 = new DataSourceInitializer();
         dataSourceInitializer2.setDatabasePopulator(rttpQuartzSchemaCreator);
      }
      if (rttpMessageStoreSchemaCreator != null) {
         dataSourceInitializer3 = new DataSourceInitializer();
         dataSourceInitializer3.setDatabasePopulator(rttpMessageStoreSchemaCreator);
      }
      //
      LinkedHashSet<Callable<?>> tasks = new LinkedHashSet<Callable<?>>();
      if (rttpBatchSchemaCreator != null && batchTablesCreationEnabled) {
         tasks.add(createTask(dataSourceInitializer1));
      }
      if (rttpQuartzSchemaCreator != null && quartzTablesCreationEnabled) {
         tasks.add(createTask(dataSourceInitializer2));
      }
      if (rttpMessageStoreSchemaCreator != null && messageStoreCreationEnabled) {
         tasks.add(createTask(dataSourceInitializer3));
      }
      StopWatch stopWatch = new StopWatch("rttp.db.scheduler.initializer");
      try {
         MultiTaskExecutionWaiter executionWaiter = new MultiTaskExecutionWaiter(tasks,
                  parallelTablesCreation);
         executionWaiter.await();
      } finally {
         stopWatch.stop();
         logger.info("database schema created in %s milisecs", stopWatch.getElapsedTime());
      }
   }

   private Callable<Void> createTask(final DataSourceInitializer dataSourceInitializer) {
      dataSourceInitializer.setDataSource(dataSource);
      return new Callable<Void>() {
         @Override
         public Void call() throws Exception {
            dataSourceInitializer.afterPropertiesSet();
            return null;
         }

         @Override
         public String toString() {
            return format("callable[%s]", dataSourceInitializer.toString());
         }
      };
   }

   /**
    * this method allows to disable quartz schema creation.
    * 
    * @param enabled
    *           create flag.
    */
   @Required
   public void setQuartzTablesCreationEnabled(boolean enabled) {
      this.quartzTablesCreationEnabled = enabled;
   }

   /**
    * this method allows to disable batch schema creation.
    * 
    * @param enabled
    *           create flag.
    */
   @Required
   public void setBatchTablesCreationEnabled(boolean enabled) {
      this.batchTablesCreationEnabled = enabled;
   }

   /**
    * this method allows to disable message store schema creation.
    * 
    * @param enabled
    *           create flag.
    */
   @Required
   public void setMessageStoreCreationEnabled(boolean enabled) {
      this.messageStoreCreationEnabled = enabled;
   }

   /**
    * this method allows to create quartz/spring batch schema creation in parallel(can save some
    * time).
    * <p/>
    * Note: some databases may not support this in memory mode.
    * 
    * @param parallelTablesCreation
    *           create quartz/spring batch schema in parallel flag.
    */
   public void setParallelTablesCreation(boolean parallelTablesCreation) {
      this.parallelTablesCreation = parallelTablesCreation;
   }

   @Autowired
   @Required
   @Qualifier(value = IBeanNameReferences.ACTUAL_DATASOURCE)
   public void setDataSource(DataSource dataSource) {
      this.dataSource = dataSource;
   }

   @Override
   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      this.applicationContext = applicationContext;
   }
}
